package com.soyuan;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.reflect.TypeToken;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Sorts;
import com.soyuan.model.*;
import com.soyuan.util.*;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.*;

import static com.mongodb.client.model.Filters.*;
import static com.mongodb.client.model.Updates.set;

/**
 * Created by twx on 2017/10/31.
 */
public class QuartzJob implements Job {
    private static final Logger logger = LoggerFactory.getLogger(QuartzJob.class);

    private Gson gson = new Gson();

    private List<String> containerList,companyList,billNoList,goodsList;
    private List<OfficerDTO> officerList;

    MongoCollection<Document> exDataClt;
    MongoCollection<Document> actionClt;

    private String etlPushExUrl;
    private String etlPushAcUrl;
    private String etlIndexUrl;

    public void init() throws IOException {
        containerList = ReadFileUtil.read("container.txt");
        companyList = ReadFileUtil.read("company.txt");
        billNoList = ReadFileUtil.read("billNo.txt");
        goodsList = ReadFileUtil.read("goods.txt");
        officerList = OfficerDTO.getOfficers("officers.txt");

        exDataClt = MongoUtil.getCollection("exData");
        actionClt = MongoUtil.getCollection("actionData");

        Properties properties = new Properties();

        InputStream is = QuartzJob.class.getClassLoader().getResourceAsStream("url.properties");

        properties.load(is);

        etlPushExUrl = (String) properties.get("etl.pushExUrl");
        etlPushAcUrl = (String) properties.get("etl.pushAcUrl");
        etlIndexUrl = (String) properties.get("etl.indexUrl");

        is.close();
    }

    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        logger.info("开始Quartz Job...");
        try {
            init();
        } catch (IOException e) {
            e.printStackTrace();
        }

        boolean status = HttpClientUtil.testConn(etlIndexUrl);
        boolean mongoStatus = MongoUtil.connMongo();
        if (!status || !mongoStatus){
            try {
                context.getScheduler().shutdown();
                MongoUtil.close();
                logger.error("连接ETL状态："+status+" 连接MongoDB状态："+mongoStatus);
                logger.error("Quartz即将退出...");
            } catch (SchedulerException e) {
                logger.error("关闭Quartz :"+e.getMessage());
                e.printStackTrace();
            }
            return;
        }else{
            MongoUtil.close();
        }

        MongoCollection<Document> exDataClt = MongoUtil.getCollection("exData");
        MongoCollection<Document> actionClt = MongoUtil.getCollection("actionData");

        List<NodeField> exFields = null;//查验位列表
        try {
            exFields = ReadFileUtil.getExField("examField.csv");
            logger.info("查验场共"+exFields.size()+"个");
        } catch (IOException e) {
            e.printStackTrace();
        }
        //随机打乱查验位
        Collections.shuffle(exFields);
        logger.info("随机打乱查验位成功");
        //节点状态列表
        List<NodeStatus> nodeStatusList = NodeStatus.getNodeStatusList();

        List<Document> exDataIsNull = exDataClt.find(regex("sendTime", LocalDate.now().toString())).into(new ArrayList<>());
        logger.info("exData表今天的记录条数："+exDataIsNull.size());
        if (exDataIsNull.size() == 0) {
            logger.info("开始模拟制造今天的数据");
            //造一波数据
            LocalDateTime morning = LocalDateTime.now().withHour(8).withMinute(30);
            LocalDateTime afternoon = LocalDateTime.now().withHour(13).withMinute(30);

            LocalDateTime nowBase = LocalDateTime.now();
            for (int i=0;i<exFields.size();i++) {
                //服务开始时间 随机基准时间
                LocalDateTime beginTime;
                if (i < exFields.size() / 2) {
                    beginTime = morning;
                }else{
                    beginTime = afternoon;
                }
//                LocalDateTime beginTime = nowBase;

                //开始造业务数据
                NodeField field = exFields.get(i);//获取查验位
                BusinessDTO bdto = generatorBusinessData(field);
//                int randomMinus = RandomContainer.generatorInt(10,20);
//                int randomMinus = RandomContainer.generatorInt(1,2);
                int randomMinus = RandomContainer.generatorInt(2,120);

//                LocalDateTime businessSendTime = beginTime.plusSeconds(randomMinus);
                LocalDateTime businessSendTime = beginTime.plusMinutes(randomMinus);
                BusinessMongo bmongo = new BusinessMongo(businessSendTime.toString()
                        , false, bdto);
                //插入mongo业务表
                exDataClt.insertOne(Document.parse(gson.toJson(bmongo)));

                //开始造节点数据
                NodeDTO ndto = new NodeDTO(bdto.getContainerNo(), bdto.getExamineRecordNo(),
                        field.getExFieldCode(), field.getExFieldName(),
                        field.getLocationCode(), field.getSource());
                for (NodeStatus nodeStatus : nodeStatusList) {
                    int plusMinus = RandomContainer.generatorInt(nodeStatus.getMin(), nodeStatus.getMax());
                    businessSendTime = businessSendTime.plusMinutes(plusMinus);
//                    businessSendTime = businessSendTime.plusSeconds(plusMinus);

                    ndto.setTransferTime(businessSendTime.toString());
                    ndto.setStatusCode(nodeStatus.getCode());
                    ndto.setStatus(nodeStatus.getStatus());
                    ndto.setActionTime(businessSendTime.toString());

                    //存入mongo
                    NodeMongo nodeMongo = new NodeMongo(false, ndto);
                    Document nodeDoc = Document.parse(gson.toJson(nodeMongo));
                    actionClt.insertOne(nodeDoc);
                }
            }//end for 造数据
            logger.info("完成模拟制造今天的数据");
        }

        logger.info("开始轮询发送数据信息到ETL...");
        while (true) {
            LocalDateTime now = LocalDateTime.now();
            //该时间点前未发送的业务数据
            MongoCursor<Document> exDatas = exDataClt.find(and(lt("sendTime", now.toString()),
                    gt("sendTime",now.with(LocalTime.MIN).toString()),
                    eq("status", false))
            ).iterator();
            logger.info("开始发送业务数据到ETL...");
            int countExData = sendData2ETL(exDataClt, exDatas,etlPushExUrl);
            logger.info("本次共发送成功"+countExData+"个业务信息到ETL");

            //该时间节点前未发送的节点数据
            MongoCursor<Document> actionDatas = actionClt.find(and(
                    lt("data.actionTime", now.toString()),
                    gt("data.actionTime",now.with(LocalTime.MIN).toString()),
                    eq("status", false)))
                    .sort(Sorts.ascending("data.containerNo","data.examineRecordN"))
                    .iterator();
            logger.info("开始发送节点数据到ETL...");
            int countActionData = sendData2ETL(actionClt,actionDatas,etlPushAcUrl);
            logger.info("本次共发送"+countActionData+"个节点信息到ETL");


            //是否退出循环
            int hasNotSendExSize = exDataClt.find(
                    eq("status", false)).into(new ArrayList<>()).size();
            int hasNotSendAcSize = actionClt.find(
                    eq("status", false)).into(new ArrayList<>()).size();
            logger.debug("未发送的业务数据 : "+hasNotSendExSize+" 未发生的节点数据: "+hasNotSendAcSize);
            if (hasNotSendAcSize == 0 && hasNotSendExSize == 0) {
                logger.info("今天的数据已全部发送...终止循环");
                break;
            }

            try {
                Thread.sleep(1000*5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }// end while
        logger.info("结束轮询发送数据信息到ETL...");
        logger.info("结束Quartz Job...");
        logger.info("关闭MongoDB 连接");
        MongoUtil.close();
    }

    private int sendData2ETL(MongoCollection<Document> collection, MongoCursor<Document> documents,String url) {
        int count = 0;
        int size=0;
        while (documents.hasNext()) {
            size++;
            Document next = documents.next();
            Object data = next.get("data");
            String doExResponse = HttpClientUtil.doPost(url, gson.toJson(data));
//
            if (doExResponse != null) {  //连不上服务器或者响应超时会返回空 或者 服务器返回的状态不是【200-300】，手动置空的
                JsonElement jsonElement = new JsonParser().parse(doExResponse);
                JsonObject jsonObj = jsonElement.getAsJsonObject();//
                String result = jsonObj.get("result").getAsString();
                if (result.equals("success")) {
                    count++;
                    ObjectId id = next.getObjectId("_id");
                    collection.updateOne(eq("_id", id), set("status", true));
                }else{
                    String errorDetails = jsonObj.get("errorDetails").getAsString();
                    logger.error("etl返回的错误信息："+errorDetails);
                }
            }
        }
        logger.info("本次轮询Mongo数据库找出的数据量共"+size+"条");
        return count;
    }

    /**
     * 构造业务数据（集装箱数据）
     * @return
     */
    public BusinessDTO generatorBusinessData(NodeField field){
        //集装箱号
        String container=RandomContainer.generatorContainerNo();
        String entryNo=RandomContainer.generator();
        String billNo=RandomList.getRandom(billNoList);
        String examineRecordNo=RandomContainer.generatorExamRecord();
        String good=RandomList.getRandom(goodsList);
        String company=RandomList.getRandom(companyList);

        OfficerDTO officerDTO1=RandomList.getRandomOfficer(officerList);
        OfficerDTO officerDTO2=RandomList.getRandomOfficer(officerList);

        BusinessDTO bto=new BusinessDTO(container,entryNo,billNo,examineRecordNo,company,LocalDateTime.now().toString());
        bto.setGood(good);
        bto.setExamineOfficer(officerDTO1);
        bto.setExamineOfficer(officerDTO2);
        bto.setExamFieldCode(field.getExFieldCode());
        bto.setExamSiteCode(field.getLocationCode());
        return bto;
    }
}
